Laravel Broadcast——广播系统源码剖析

前言

在现代的 web 应用程序中,WebSockets 被用来实现需要实时、即时更新的接口。当服务器上的数据被更新后,更新信息将通过 WebSocket 连接发送到客户端等待处理。相比于不停地轮询应用程序,WebSocket 是一种更加可靠和高效的选择。

我们先用一个电子商务网站作为例子来概览一下事件广播。当用户在查看自己的订单时,我们不希望他们必须通过刷新页面才能看到状态更新。我们希望一旦有更新时就主动将更新信息广播到客户端。

laravel 的广播系统和队列系统类似,需要两个进程协作,一个是 laravelweb 后台系统,另一个是 Socket.IO 服务器系统。具体的流程是页面加载时,网页 js 程序 Laravel EchoSocket.IO 服务器建立连接, laravel 发起通过驱动发布广播,Socket.IO 服务器接受广播内容,对连接的客户端网页推送信息,以达到网页实时更新的目的。

laravel 发起广播的方式有两种,redispusher。对于 redis 来说,需要支持 Socket.IO 服务器系统,官方推荐 nodejs 为底层的 tlaverdure/laravel-echo-server。对于 pusher 来说,该第三方服务包含了驱动与 Socket.IO 服务器。

本文将会介绍 redis 为驱动的广播源码,由于 laravel-echo-servernodejs 编写,本文也无法介绍 Socket.IO 方面的内容。

广播系统服务的启动

和其他服务类似,广播系统服务的注册实质上就是对 Ioc 容器注册门面类,广播系统的门面类是 BroadcastManager:

  1. class BroadcastServiceProvider extends ServiceProvider
  2. {
  3. public function register()
  4. {
  5. $this->app->singleton(BroadcastManager::class, function ($app) {
  6. return new BroadcastManager($app);
  7. });
  8. $this->app->singleton(BroadcasterContract::class, function ($app) {
  9. return $app->make(BroadcastManager::class)->connection();
  10. });
  11. $this->app->alias(
  12. BroadcastManager::class, BroadcastingFactory::class
  13. );
  14. }
  15. }

除了注册 BroadcastManagerBroadcastServiceProvider 还进行了广播驱动的启动:

  1. public function connection($driver = null)
  2. {
  3. return $this->driver($driver);
  4. }
  5. public function driver($name = null)
  6. {
  7. $name = $name ?: $this->getDefaultDriver();
  8. return $this->drivers[$name] = $this->get($name);
  9. }
  10. protected function get($name)
  11. {
  12. return isset($this->drivers[$name]) ? $this->drivers[$name] : $this->resolve($name);
  13. }
  14. protected function resolve($name)
  15. {
  16. $config = $this->getConfig($name);
  17. if (is_null($config)) {
  18. throw new InvalidArgumentException("Broadcaster [{$name}] is not defined.");
  19. }
  20. if (isset($this->customCreators[$config['driver']])) {
  21. return $this->callCustomCreator($config);
  22. }
  23. $driverMethod = 'create'.ucfirst($config['driver']).'Driver';
  24. if (! method_exists($this, $driverMethod)) {
  25. throw new InvalidArgumentException("Driver [{$config['driver']}] is not supported.");
  26. }
  27. return $this->{$driverMethod}($config);
  28. }
  29. protected function createRedisDriver(array $config)
  30. {
  31. return new RedisBroadcaster(
  32. $this->app->make('redis'), Arr::get($config, 'connection')
  33. );
  34. }

广播信息的发布

广播信息的发布与事件的发布大致相同,要告知 Laravel 一个给定的事件是广播类型,只需在事件类中实现 Illuminate\Contracts\Broadcasting\ShouldBroadcast 接口即可。该接口已经被导入到所有由框架生成的事件类中,所以可以很方便地将它添加到自己的事件中。

ShouldBroadcast 接口要求你实现一个方法:broadcastOn. broadcastOn 方法返回一个频道或一个频道数组,事件会被广播到这些频道。频道必须是 ChannelPrivateChannelPresenceChannel 的实例。Channel 实例表示任何用户都可以订阅的公开频道,而 PrivateChannelsPresenceChannels 则表示需要 频道授权 的私有频道:

  1. class ServerCreated implements ShouldBroadcast
  2. {
  3. use SerializesModels;
  4. public $user;
  5. //默认情况下,每一个广播事件都被添加到默认的队列上,默认的队列连接在 queue.php 配置文件中指定。可以通过在事件类中定义一个 broadcastQueue 属性来自定义广播器使用的队列。该属性用于指定广播使用的队列名称:
  6. public $broadcastQueue = 'your-queue-name';
  7. public function __construct(User $user)
  8. {
  9. $this->user = $user;
  10. }
  11. public function broadcastOn()
  12. {
  13. return new PrivateChannel('user.'.$this->user->id);
  14. }
  15. //Laravel 默认会使用事件的类名作为广播名称来广播事件,自定义:
  16. public function broadcastAs()
  17. {
  18. return 'server.created';
  19. }
  20. //想更细粒度地控制广播数据:
  21. public function broadcastWith()
  22. {
  23. return ['id' => $this->user->id];
  24. }
  25. //有时,想在给定条件为 true ,才广播事件:
  26. public function broadcastWhen()
  27. {
  28. return $this->value > 100;
  29. }
  30. }

然后,只需要像平时那样触发事件。一旦事件被触发,一个队列任务会自动广播事件到你指定的广播驱动器上。

当一个事件被广播时,它所有的 public 属性会自动被序列化为广播数据,这允许你在你的 JavaScript 应用中访问事件的公有数据。因此,举个例子,如果你的事件有一个公有的 $user 属性,它包含了一个 Elouqent 模型,那么事件的广播数据会是:

  1. {
  2. "user": {
  3. "id": 1,
  4. "name": "Patrick Stewart"
  5. ...
  6. }
  7. }

广播发布的源码

广播的发布与事件的触发是一体的,具体的流程我们已经在 event 的源码中介绍清楚了,现在我们来看唯一的不同:

  1. public function dispatch($event, $payload = [], $halt = false)
  2. {
  3. list($event, $payload) = $this->parseEventAndPayload(
  4. $event, $payload
  5. );
  6. if ($this->shouldBroadcast($payload)) {
  7. $this->broadcastEvent($payload[0]);
  8. }
  9. ...
  10. }
  11. protected function shouldBroadcast(array $payload)
  12. {
  13. return isset($payload[0]) && $payload[0] instanceof ShouldBroadcast;
  14. }
  15. protected function broadcastEvent($event)
  16. {
  17. $this->container->make(BroadcastFactory::class)->queue($event);
  18. }

可见,关键之处在于 BroadcastManagerquene 方法:

  1. public function queue($event)
  2. {
  3. $connection = $event instanceof ShouldBroadcastNow ? 'sync' : null;
  4. if (is_null($connection) && isset($event->connection)) {
  5. $connection = $event->connection;
  6. }
  7. $queue = null;
  8. if (isset($event->broadcastQueue)) {
  9. $queue = $event->broadcastQueue;
  10. } elseif (isset($event->queue)) {
  11. $queue = $event->queue;
  12. }
  13. $this->app->make('queue')->connection($connection)->pushOn(
  14. $queue, new BroadcastEvent(clone $event)
  15. );
  16. }

可见,quene 方法将广播事件包装为事件类,并且通过队列发布,我们接下来看这个事件类的处理:

  1. class BroadcastEvent implements ShouldQueue
  2. {
  3. public function handle(Broadcaster $broadcaster)
  4. {
  5. $name = method_exists($this->event, 'broadcastAs')
  6. ? $this->event->broadcastAs() : get_class($this->event);
  7. $broadcaster->broadcast(
  8. array_wrap($this->event->broadcastOn()), $name,
  9. $this->getPayloadFromEvent($this->event)
  10. );
  11. }
  12. protected function getPayloadFromEvent($event)
  13. {
  14. if (method_exists($event, 'broadcastWith')) {
  15. return array_merge(
  16. $event->broadcastWith(), ['socket' => data_get($event, 'socket')]
  17. );
  18. }
  19. $payload = [];
  20. foreach ((new ReflectionClass($event))->getProperties(ReflectionProperty::IS_PUBLIC) as $property) {
  21. $payload[$property->getName()] = $this->formatProperty($property->getValue($event));
  22. }
  23. return $payload;
  24. }
  25. protected function formatProperty($value)
  26. {
  27. if ($value instanceof Arrayable) {
  28. return $value->toArray();
  29. }
  30. return $value;
  31. }
  32. }

可见该事件主要调用 broadcasterbroadcast 方法,我们这里讲 redis 的发布:

  1. class RedisBroadcaster extends Broadcaster
  2. {
  3. public function broadcast(array $channels, $event, array $payload = [])
  4. {
  5. $connection = $this->redis->connection($this->connection);
  6. $payload = json_encode([
  7. 'event' => $event,
  8. 'data' => $payload,
  9. 'socket' => Arr::pull($payload, 'socket'),
  10. ]);
  11. foreach ($this->formatChannels($channels) as $channel) {
  12. $connection->publish($channel, $payload);
  13. }
  14. }
  15. }
  16. protected function formatChannels(array $channels)
  17. {
  18. return array_map(function ($channel) {
  19. return (string) $channel;
  20. }, $channels);
  21. }

broadcast 方法运用了 redispublish 方法,对 redis 进行了频道的信息发布。

频道授权

对于私有频道,用户只有被授权后才能监听。实现过程是用户向 Laravel 应用程序发起一个携带频道名称的 HTTP 请求,应用程序判断该用户是否能够监听该频道。在使用 Laravel Echo 时,上述 HTTP 请求会被自动发送;尽管如此,仍然需要定义适当的路由来响应这些请求。

定义授权路由

我们可以在 Laravel 里很容易地定义路由来响应频道授权请求。

  1. Broadcast::routes();

Broadcast::routes 方法会自动把它的路由放进 web 中间件组中;另外,如果你想对一些属性自定义,可以向该方法传递一个包含路由属性的数组

  1. Broadcast::routes($attributes);

定义授权回调

接下来,我们需要定义真正用于处理频道授权的逻辑。这是在 routes/channels.php 文件中完成。在该文件中,你可以用 Broadcast::channel 方法来注册频道授权回调函数:

  1. Broadcast::channel('order.{orderId}', function ($user, $orderId) {
  2. return $user->id === Order::findOrNew($orderId)->user_id;
  3. });

channel 方法接收两个参数:频道名称和一个回调函数,该回调通过返回 truefalse 来表示用户是否被授权监听该频道。

所有的授权回调接收当前被认证的用户作为第一个参数,任何额外的通配符参数作为后续参数。在本例中,我们使用 {orderId} 占位符来表示频道名称的「ID」部分是通配符。

授权回调模型绑定

就像 HTTP 路由一样,频道路由也可以利用显式或隐式 路由模型绑定。例如,相比于接收一个字符串或数字类型的 order ID,你也可以请求一个真正的 Order 模型实例:

  1. Broadcast::channel('order.{order}', function ($user, Order $order) {
  2. return $user->id === $order->user_id;
  3. });

频道授权源码分析

授权路由

  1. class BroadcastManager implements FactoryContract
  2. {
  3. public function routes(array $attributes = null)
  4. {
  5. if ($this->app->routesAreCached()) {
  6. return;
  7. }
  8. $attributes = $attributes ?: ['middleware' => ['web']];
  9. $this->app['router']->group($attributes, function ($router) {
  10. $router->post('/broadcasting/auth', BroadcastController::class.'@authenticate');
  11. });
  12. }
  13. }

频道专门有 Controller 来处理授权服务:

  1. class BroadcastController extends Controller
  2. {
  3. public function authenticate(Request $request)
  4. {
  5. return Broadcast::auth($request);
  6. }
  7. }

Socket Io 服务器对 javascript 程序推送数据的时候,首先会经过该 controller 进行授权验证:

  1. public function auth($request)
  2. {
  3. if (Str::startsWith($request->channel_name, ['private-', 'presence-']) &&
  4. ! $request->user()) {
  5. throw new HttpException(403);
  6. }
  7. $channelName = Str::startsWith($request->channel_name, 'private-')
  8. ? Str::replaceFirst('private-', '', $request->channel_name)
  9. : Str::replaceFirst('presence-', '', $request->channel_name);
  10. return parent::verifyUserCanAccessChannel(
  11. $request, $channelName
  12. );
  13. }

verifyUserCanAccessChannel 根据频道与其绑定的闭包函数来验证该频道是否可以通过授权:

  1. protected function verifyUserCanAccessChannel($request, $channel)
  2. {
  3. foreach ($this->channels as $pattern => $callback) {
  4. if (! Str::is(preg_replace('/\{(.*?)\}/', '*', $pattern), $channel)) {
  5. continue;
  6. }
  7. $parameters = $this->extractAuthParameters($pattern, $channel, $callback);
  8. if ($result = $callback($request->user(), ...$parameters)) {
  9. return $this->validAuthenticationResponse($request, $result);
  10. }
  11. }
  12. throw new HttpException(403);
  13. }

由于频道的命名经常带有 userid 等参数,因此判断频道之前首先要把 channels 中的频道名转为通配符 *,例如 order.{userid} 转为 order.*,之后进行正则匹配。

extractAuthParameters 用于提取频道的闭包函数的参数,合并 $request->user() 之后调用闭包函数。

  1. protected function extractAuthParameters($pattern, $channel, $callback)
  2. {
  3. $callbackParameters = (new ReflectionFunction($callback))->getParameters();
  4. return collect($this->extractChannelKeys($pattern, $channel))->reject(function ($value, $key) {
  5. return is_numeric($key);
  6. })->map(function ($value, $key) use ($callbackParameters) {
  7. return $this->resolveBinding($key, $value, $callbackParameters);
  8. })->values()->all();
  9. }
  10. protected function extractChannelKeys($pattern, $channel)
  11. {
  12. preg_match('/^'.preg_replace('/\{(.*?)\}/', '(?<$1>[^\.]+)', $pattern).'/', $channel, $keys);
  13. return $keys;
  14. }
  15. public function validAuthenticationResponse($request, $result)
  16. {
  17. if (is_bool($result)) {
  18. return json_encode($result);
  19. }
  20. return json_encode(['channel_data' => [
  21. 'user_id' => $request->user()->getKey(),
  22. 'user_info' => $result,
  23. ]]);
  24. }

extractChannelKeys 用于将 order.{userid}order.23userid23 建立 keyvalue 关联。如果 useridUser 的主键,resolveBinding 还可以为其自动进行路由模型绑定。